Azure | 批量获取Azure Blob列表

Author Avatar
Ricardo Liu 1月 15, 2018
  • 在其它设备中阅读本文章

摘要:介绍如何利用Azure Python SDK批量提取Blob的列表。

最近碰到朋友的一个需求,要从Azure Blob Storage中获取所有图片的文件名列表,量级大约在百万左右。

01. GUI工具

由于之前并没有接触过Azure,一时间没有思路。于是做了以下的尝试,但都失败了:

  1. Microsoft官方提供的GUI - Azure Storage Explorer:在GUI界面里只能看到一个Container里的1000个文件,其余就不能加载了。
  2. Azure的Portal:与GUI类似,加载的数量有限,无法批量获取。
  3. 第三方GUI - Azure Management Studio:在网上找到的第三方提供的GUI,功能比Azure Storage Explorer强大很多。右击某个文件夹有一个导出Blob List的选项,但是当文件夹中的文件数量超过一定阈值时,导出会失败。(据我个人推测,与Azure的API每次获取文件的数量有关,超过这个数量就不能导出了)

02. Azure SDK

现有的GUI工具直接获取的思路行不通,于是我开始尝试利用Azure提供的SDK进行爬取。在Azure的官方文档中,How to - Develop部分列出了Azure支持的开发方式,除了网上其他资源主要说的.NET外,还包括JavaC++PythonPHP等几乎所有常见的语言,甚至还支持REST方式(官方样例使用的是C#)。

本着“能写一个脚本解决的事情,绝不搞得太复杂”的原则,选择使用Python SDK进行操作。幸运的是,在Python SDK的介绍中,刚好有一个小节是”List the blobs in a container“,给出的示例代码如下:

generator = block_blob_service.list_blobs('mycontainer')
for blob in generator:
    print(blob.name)

以为可以直接搞定,但是发现返回的结果只有5000张图片。看来介绍里的内容不够完整,于是去阅读Azure Python SDK的文档,找到了关于list_blobs函数的说明

list_blobs(container_name, prefix=None, num_results=None, include=None, delimiter=None, marker=None, timeout=None)

Returns a generator to list the blobs under the specified container. The generator will lazily follow the continuation tokens returned by the service and stop when all blobs have been returned or num_results is reached.

If num_results is specified and the account has more than that number of blobs, the generator will have a populated next_marker field once it finishes. This marker can be used to create a new generator if more results are desired.

Parameters:

  • container_name (str) – Name of existing container.
  • prefix (str) – Filters the results to return only blobs whose names begin with the specified prefix.
  • num_results (int) – Specifies the maximum number of blobs to return, including all BlobPrefix elements. If the request does not specify num_results or specifies a value greater than 5,000, the server will return up to 5,000 items. Setting num_results to a value less than or equal to zero results in error response code 400 (Bad Request).
  • include (Include) – Specifies one or more additional datasets to include in the response.
  • delimiter (str) – When the request includes this parameter, the operation returns a BlobPrefix element in the result list that acts as a placeholder for all blobs whose names begin with the same substring up to the appearance of the delimiter character. The delimiter may be a single character or a string.
  • marker (str) – An opaque continuation token. This value can be retrieved from the next_marker field of a previous generator object if num_results was specified and that generator has finished enumerating results. If specified, this generator will begin returning results from the point where the previous generator stopped.
  • timeout (int) – The timeout parameter is expressed in seconds.

发现了问题的关键所在:list_blobs每次返回的数量由num_results控制,至多为5,000,如果遍历尚未结束,则返回的结果中会有名为next_marker的field,在下一次调用list_blobs时将上一次调用结果里next_marker的值作为参数marker的值,则可以从上一次遍历结束的位置之后继续获取。于是,问题解决了。

03. 优化

写完脚本心满意足地开始运行时,发现速度感人,全部跑完大约需要60多个小时。由于原来图片是分别存放在多个文件夹中的,自然的想法就是并行化。将原来的任务打散,每个线程获取一个文件夹中的内容,最终大约花费两个小时的时间爬取了约100万个文件。

04. 代码

下面代码完成的功能是利用多线程(每个线程每次爬取的文件数为1000),爬取Azure Blob Container中名称为beijingXXX(XXX为001-100)的文件夹中所有文件的文件名,将结果保存在对应的beijingXXX.txt文件中。

代码也可以在我的Github中查看。

#!/usr/bin/python2
# -*- coding:utf-8 -*-

import multiprocessing
import os
import time

from azure.storage.blob import BlockBlobService


CONFIG = {
    'account_name': 'your_accout_name',
    'account_key': 'your_account_key',
    'container_name': 'your_container_name'
}
NUM_PER_ITERATION = 1000


def list_blobs(folder):
    block_blob_service = BlockBlobService(account_name=CONFIG['account_name'], account_key=CONFIG['account_key'])
    fp = open(folder + '.txt', 'a+')

    counter = 0
    print "{} - {} - COUNT {}".format(folder, time.asctime(time.localtime()), counter)
    blob_list = list()
    generator = block_blob_service.list_blobs(container_name=CONFIG['container_name'], prefix=folder,
                                              num_results=NUM_PER_ITERATION)
    for blob in generator:
        blob_list.append(blob.name)
    fp.write('\n'.join(blob_list) + '\n')
    next_marker = generator.next_marker
    counter += len(blob_list)
    print "{} - {} - COUNT {}".format(folder, time.asctime(time.localtime()), counter)
    while next_marker:
        blob_list = list()
        generator = block_blob_service.list_blobs(container_name=CONFIG['container_name'], prefix=folder,
                                                  num_results=NUM_PER_ITERATION, marker=next_marker)
        for blob in generator:
            blob_list.append(blob.name)
        fp.write('\n'.join(blob_list) + '\n')
        next_marker = generator.next_marker
        counter += len(blob_list)
        print "{} - {} - COUNT {}".format(folder, time.asctime(time.localtime()), counter)
    fp.close()


if __name__ == '__main__':
    print time.asctime(time.localtime()) + " PID: {}".format(os.getpid())
    folders = list()
    for i in range(1, 101):
        folders.append('{}{:0>3}'.format('beijing', i))

    pool = multiprocessing.Pool()
    for i in xrange(len(folders)):
        folder = folders[i]
        print "{} - {} - START...".format(folder, time.asctime(time.localtime()))
        pool.apply_async(list_blobs, args=(folder,))
    pool.close()
    pool.join()
    print time.asctime(time.localtime()) + " Done."